package com.atuguigu.flink.Day01.Example;

import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class KeyedExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<SendsorReading> stream = env.addSource(new SensorSource());

        KeyedStream<SendsorReading, String> keyedStream = stream.keyBy(
                new KeySelector<SendsorReading, String>() {
                    @Override
                    public String getKey(SendsorReading value) throws Exception {
                        return value.id;
                    }
                }
        );
        // 键控流经过聚合以后又变成了单条流
        SingleOutputStreamOperator<SendsorReading> reduceStream = keyedStream.reduce(
                new ReduceFunction<SendsorReading>() {
                    @Override
                    public SendsorReading reduce(SendsorReading t1, SendsorReading t2) throws Exception {
                        // 将累加器的值输出,最小，最大
                        return new SendsorReading(t1.id, Math.min(t1.temperture, t2.temperture), 0L);
                    }
                }
        );


        reduceStream.print();
        env.execute();
    }
}
